// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-05-21 12:05:05

package command

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	rt "runtime"
	"strconv"
	"strings"
	"syscall"
	"time"

	"jianmu-worker-docker/client"
	"jianmu-worker-docker/engine"
	"jianmu-worker-docker/poller"
	"jianmu-worker-docker/runtime"

	"gitee.com/jianmu-workers/worker-go/server"

	"github.com/hashicorp/serf/cmd/serf/command/agent"
	"github.com/joho/godotenv"
	c "github.com/mitchellh/cli"
	"github.com/sirupsen/logrus"
	"golang.org/x/sync/errgroup"
)

// 空context.
var emptyContext = context.Background()

type DaemonCmd struct {
	Envfile string `arg:"" optional:"" name:"envfile" help:"load the environment variable file." type:"path"`
}

// cpu大于0.01核
func checkCpus(cpus float64) error {
	if cpus == 0 {
		return nil
	}

	if cpus-0.01 < 0.0 {
		return errors.New("JIANMU_WORKER_CONTAINER_CPUS配置错误,最小值是0.01")
	}
	// 检查是否大于cpu的最大核心数
	if cpus > float64(rt.NumCPU()) {
		return fmt.Errorf(fmt.Sprintf("JIANMU_WORKER_CONTAINER_CPUS配置错误,最大值是本机CPU的核心数,本机CPU核心数是%d ",
			rt.NumCPU()))
	}

	return nil
}

func parseMemory(mem string) (int64, error) {
	if mem == "" || mem == "0" {
		return 0, nil
	}

	num, err := strconv.ParseInt(mem[:len(mem)-1], 10, 64)
	if err != nil {
		return 0, errors.New("JIANMU_WORKER_CONTAINER_MEMORY配置错误,eg 1000M 3g")
	}

	unit := strings.ToLower(mem[len(mem)-1:])
	//单位可以是 b, k, m, or g 转化成bytes
	switch unit {
	case "b":
		return checkMemory(num)
	case "k":
		return checkMemory(num * 1024)
	case "m":
		return checkMemory(num * 1024 * 1024)
	case "g":
		return checkMemory(num * 1024 * 1024 * 1024)
	default:
		return 0, errors.New("JIANMU_WORKER_CONTAINER_MEMORY配置错误,Memory limit (format: <number>[<unit>]). " +
			"Number is a positive integer. Unit can be one of b, k, m, or g. Minimum is 4M")
	}

}

// 检查内存是否不小于4M
func checkMemory(mem int64) (int64, error) {
	if mem < 4*1024*1024 {
		return 0, errors.New("JIANMU_WORKER_CONTAINER_MEMORY配置错误,最小值是4M")
	}

	return mem, nil
}

func (d *DaemonCmd) startSerf(ctx context.Context, config Config) error {
	ui := &c.BasicUi{Writer: os.Stdout}
	cmd := &agent.Command{
		Ui:         ui,
		ShutdownCh: make(chan struct{}),
	}
	args := []string{
		"-node=" + config.Worker.ID,
		"-bind=" + config.Node.Bind,
		"-rpc-addr=" + config.Node.Rpc_addr,
		"-snapshot=snapshot",
		"-rejoin=true",
		"-tag",
		"role=worker",
		"-tag",
		"os=" + config.Platform.OS,
		"-tag",
		"arch=" + config.Platform.Arch,
		"-tag",
		"cpus=" + strconv.FormatFloat(config.ContainerConfig.Cpus, 'f', 0, 64),
		"-tag",
		"memory=" + config.ContainerConfig.Memory,
		"-tag",
		"capacity=" + strconv.Itoa(config.Worker.Capacity),
		"-tag",
		"srv_address=" + config.Client.Address,
	}
	if config.Node.Advertise != "" {
		args = append(args, "-advertise="+strings.TrimSpace(config.Node.Advertise))
	}
	if config.Node.Join != "" {
		split := strings.Split(config.Node.Join, ",")
		for _, line := range split {
			if line != "" {
				args = append(args, "-join="+strings.TrimSpace(line))
			}
		}
	}
	cmd.Run(args)
	return nil
}

func (d *DaemonCmd) run(ctx context.Context, config Config) error {
	opts := engine.Opts{
		HidePull: false,
	}
	dockercli, err := engine.NewEnv(opts)
	if err != nil {
		logrus.WithError(err).
			Fatalln("cannot load the docker engine")
	}
	for {
		err := dockercli.Ping(ctx)
		if err == context.Canceled {
			break
		}
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		if err != nil {
			logrus.WithError(err).
				Errorln("cannot ping the docker daemon")
			time.Sleep(time.Second)
		} else {
			logrus.Debugln("successfully pinged the docker daemon")
			break
		}
	}
	cli := client.New(
		config.Client.Address,
		config.Client.Secret,
		false,
		config.Worker.ID,
	)

	memory, err := parseMemory(config.ContainerConfig.Memory)
	if err != nil {
		return err
	}

	err = checkCpus(config.ContainerConfig.Cpus)
	if err != nil {
		return err
	}

	worker := &runtime.Worker{
		Client: cli,
		Engine: dockercli,
		ID:     config.Worker.ID,
		Name:   config.Worker.Name,
		Type:   config.Worker.Type,
		Config: &runtime.ContainerConfig{
			Cpus:        config.ContainerConfig.Cpus,
			Memory:      memory,
			NetworkMode: config.ContainerConfig.NetworkMode,
			Timeout:     config.ContainerConfig.Timeout,
		},
	}
	poller := &poller.Poller{
		Client:   cli,
		Dispatch: worker.Run,
		Filter:   nil,
	}
	var g errgroup.Group
	server := server.Server{
		Addr:   config.Server.Port,
		Docker: dockercli,
	}

	logrus.WithField("addr", config.Server.Port).
		Infoln("starting the server")

	g.Go(func() error {
		return server.ListenAndServe(ctx)
	})

	if config.Node.Enable {
		logrus.Infoln("starting the serf")
		g.Go(func() error {
			return d.startSerf(ctx, config)
		})
	}

	timeOut := time.Duration(config.Worker.RegisterTimeout) * time.Second
	// 0 表示一直ping
	if timeOut == 0 {
		timeOut = 1<<63 - 1
	}

	registerCtx, cancelFunc := context.WithTimeout(ctx, timeOut)
	defer cancelFunc()

	// 循环Ping服务器直到成功
	for {
		err := cli.Ping(registerCtx)
		select {
		case <-registerCtx.Done():
			return fmt.Errorf(fmt.Sprintf("注册到%s超时", config.Client.Address))
		default:
		}
		if registerCtx.Err() != nil {
			return registerCtx.Err()
		}
		if err != nil {
			logrus.WithError(err).
				WithField("address", config.Client.Address).
				Errorln("cannot ping the remote server")
			time.Sleep(time.Second)
		} else {
			logrus.WithField("address", config.Client.Address).
				Debugln("successfully pinged the remote server")
			break
		}
	}

	w := &engine.Worker{
		ID:       config.Worker.ID,
		Name:     config.Worker.Name,
		Type:     config.Worker.Type,
		Tag:      config.Worker.Tags,
		Os:       config.Platform.OS,
		Arch:     config.Platform.Arch,
		Capacity: config.Worker.Capacity,
	}

	if err := cli.Join(registerCtx, w); err != nil {
		logrus.WithError(err).
			WithField("address", config.Client.Address).
			Errorln("cannot join the server")
		return err
	}

	if err := cli.Online(registerCtx); err != nil {
		logrus.WithError(err).
			WithField("address", config.Client.Address).
			Errorln("cannot online the server")
		return err
	}

	cs, err := worker.Engine.FindContainers(ctx)
	if err != nil {
		logrus.Debugln("获取遗留任务失败")
	}
	for i := 0; i < len(cs); i++ {
		r, err := cli.FindById(ctx, cs[i])
		if err != nil {
			logrus.Debugf("获取遗留任务状态失败: %v \n", err)
		} else {
			go worker.Resume(ctx, r)
		}
	}

	g.Go(func() error {
		logrus.WithField("capacity", config.Worker.Capacity).
			WithField("endpoint", config.Client.Address).
			WithField("os", config.Platform.OS).
			WithField("arch", config.Platform.Arch).
			Infoln("polling the remote server")

		poller.Poll(ctx, config.Worker.Capacity)
		return nil
	})

	err = g.Wait()
	if err != nil {
		logrus.WithError(err).
			Errorln("shutting down the server")
	}
	return err
}

func (d *DaemonCmd) Run() error {
	// load environment variables from file.
	if d.Envfile != "" {
		err := godotenv.Load(d.Envfile)
		if err != nil {
			return err
		}
	}

	// load the configuration from the environment.
	config, err := FromEnviron()
	if err != nil {
		return err
	}

	// setup the global logrus logger.
	setupLogger(config)

	ctx, cancel := context.WithCancel(emptyContext)
	defer cancel()

	// 侦听终止信号实现正常退出
	ctx = WithContextFunc(ctx, func() {
		cli := client.New(
			config.Client.Address,
			config.Client.Secret,
			false,
			config.Worker.ID,
		)
		cli.Offline(ctx)
		println("received signal, terminating process")
		cancel()
	})

	return d.run(ctx, config)
}

func setupLogger(config Config) {
	if config.Debug {
		logrus.SetLevel(logrus.DebugLevel)
	}
	if config.Trace {
		logrus.SetLevel(logrus.TraceLevel)
	}
}

// WithContextFunc 收到OS中断信号时调用回调函数f，之后调用cancel
func WithContextFunc(ctx context.Context, f func()) context.Context {
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		defer signal.Stop(c)

		select {
		case <-ctx.Done():
		case <-c:
			f()
			cancel()
		}
	}()

	return ctx
}
